package com.atguigu.chapter05.Source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 * @date 2021/3/3 14:19
 */
public class Flink03_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO Source - 从 Kafka 读

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        properties.setProperty("group.id", "fffffffffff");


        FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
                "flink0923",
                new SimpleStringSchema(),
                properties
        );

        env
                .addSource(kafkaSourceFunction)
                .print();


        env.execute();
    }
}
